package com.xiang.ad.sender.kafka;

import com.alibaba.fastjson.JSON;
import com.xiang.ad.mysql.dto.MySqlRowData;
import com.xiang.ad.sender.ISender;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * Created by xiang.
 * 投递增量数据到kafka
 * MysqlRowData经过Json序列化之后投放到kafka中
 * 其他的服务可以监听kafka的topic来获取binlog增量数据。
 */
@Component("kafkaSender")
public class KafkaSender implements ISender {

    //topic名字，其他服务监听这个名字获取binlog增量数据
    @Value("${adconf.kafka.topic}") //根据配置文件填充
    private String topic;

    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaSender(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    //进行消息投递   调用send方法  就好了
    @Override
    public void sender(MySqlRowData rowData) {

        //将MysqlRowData用json序列化的方式转化为字符串投递到mq对应的topic中
        kafkaTemplate.send(
                topic, JSON.toJSONString(rowData)
        );
    }

    //kafka监听器  这里只是做个测试使用
    //监听这个topic
    @KafkaListener(topics = {"ad-search-mysql-data"}, groupId = "ad-search")
    public void processMysqlRowData(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        //如果消息存在 就消费
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            //反序列化
            MySqlRowData rowData = JSON.parseObject(
                    message.toString(),
                    MySqlRowData.class
            );
            System.out.println("kafka 处理MysqlRowData: " +
            JSON.toJSONString(rowData));//再序列化回去，复原数据，用完要负责~
        }
    }
}
